from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array_contains,explode, udf, get_json_object, expr,explode_outer, json_tuple
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType, FloatType
import json
from shapely.geometry import Point, MultiPolygon
spark = SparkSession.builder.appName("TaxiAnalyzer").getOrCreate()
./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/201902-citibike-tripdata.csv /HW2_tripdata ./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/NYCTaxiZones.geojson /HW2_taxizones
Для более быстрого считывания определяю схему данных:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
schema = StructType([
StructField("tripduration", IntegerType(), True),
StructField("starttime", TimestampType(), True),
StructField("stoptime", TimestampType(), True),
StructField("start station id", StringType(), True),
StructField("start station name", StringType(), True),
StructField("start station latitude", DoubleType(), True),
StructField("start station longitude", DoubleType(), True),
StructField("end station id", StringType(), True),
StructField("end station name", StringType(), True),
StructField("end station latitude", DoubleType(), True),
StructField("end station longitude", DoubleType(), True),
StructField("bikeid", IntegerType(), True),
StructField("usertype", StringType(), True),
StructField("birth year", IntegerType(), True),
StructField("gender", IntegerType(), True)
])
trip_data = spark.read.csv("/HW2_tripdata", header=True, schema=schema)
Удаляю ненужные столбцы
exclude = ["tripduration", "bikeid", "usertype", "birth year", "gender"]
trip_data = trip_data.drop(*exclude)
start_count = trip_data.groupBy("start station id").count().withColumnRenamed("count", "start_count")
start_count.show()
end_count = trip_data.groupBy("end station id").count().withColumnRenamed("count", "end_count")
end_count.show()
не смог адекватно спарсить geojson с мультиполигонами. Кажется, для этого бы идеально подошла библиотека geopandas. Поэтому спарсил в формате property и их geometry
nyc_geojson_content = spark.sparkContext.wholeTextFiles("/HW2_taxizones").values().collect()[0]
nyc_geojson = json.loads(nyc_geojson_content)
rdd = spark.sparkContext.parallelize(nyc_geojson['features'])
schema_zones = StructType([
StructField("geometry", StructType([
StructField("coordinates", ArrayType(ArrayType(ArrayType(ArrayType(DoubleType())))), True),
]), True),
StructField("properties", StructType([
StructField("shape_area", StringType(), True),
StructField("objectid", StringType(), True),
StructField("shape_leng", StringType(), True),
StructField("location_id", StringType(), True),
StructField("zone", StringType(), True),
StructField("borough", StringType(), True),
]), True),
])
zones_df = spark.createDataFrame(rdd, schema=schema_zones)
zones_df.printSchema()
Для избежания сложных операций по передачи zones_df в udf использую broadcas на узлы кластера
zones_dict = {row['properties']['location_id']: row['geometry']['coordinates']
for row in zones_df.collect()}
from pyspark.sql.functions import broadcast
zones_dict_bc = spark.sparkContext.broadcast(zones_dict)
def find_zone_udf(lat, lon):
point = Point(lon, lat)
for location_id, polygon_coordinates in zones_dict_bc.value.items():
multi_polygon = MultiPolygon(polygon_coordinates)
if point.within(multi_polygon):
return location_id
return "Не найдено подходящей зоны"
find_zone = udf(find_zone_udf, StringType())
trip_data = spark.read.csv("/HW2_tripdata", header=True, schema=schema)
trip_data = trip_data.limit(10000)
trip_data = trip_data.withColumn("start_zone", find_zone("start station latitude", "start station longitude"))
trip_data = trip_data.withColumn("end_zone", find_zone("end station latitude", "end station longitude"))
start_zone_counts = trip_data.groupBy("start_zone").count()
end_zone_counts = trip_data.groupBy("end_zone").count()
start_zone_info = start_zone_counts.join(zones_df, start_zone_counts.start_zone == zones_df.properties.location_id, "inner")
end_zone_info = end_zone_counts.join(zones_df, end_zone_counts.end_zone == zones_df.properties.location_id, "inner")
start_zone_info = start_zone_info.select("start_zone", "count", "geometry.coordinates")
end_zone_info = end_zone_info.select("end_zone", "count", "geometry.coordinates")
spark.conf.set("spark.sql.broadcastTimeout", "3600")
start_zone_info = start_zone_info.orderBy(F.desc("count")).cache()
start_zone_info.show()
end_zone_info = end_zone_info.orderBy(F.desc("count")).cache()
end_zone_info.show()
#df = spark.read.json("/HW2_taxizones")
#df.printSchema()
К сожалению не смог корректно спарсить этот Json в spark dataframe, сохранив исходную структуру. Поэтому взял его локально для построения мультиполигонов.
with open('/home/ubuntu/Desktop/NYCTaxiZones.geojson') as f:
nyc_geojson = json.load(f)
import plotly.express as px
fig = px.choropleth_mapbox(end_zone_info, geojson=nyc_geojson, locations='end_zone',
color='count',
mapbox_style="carto-positron",
zoom=10.3,
center = {"lat": 40.75594159, "lon": -74.0021163},
opacity=0.5,
featureidkey="properties.location_id"
)
fig.show()
fig = px.choropleth_mapbox(start_zone_info, geojson=nyc_geojson, locations='start_zone',
color='count',
mapbox_style="carto-positron",
zoom=10.3,
center = {"lat": 40.75594159, "lon": -74.0021163},
opacity=0.5,
featureidkey="properties.location_id"
)
fig.show()
trip_data = spark.read.csv("/HW2_tripdata", header=True, schema=schema)
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import math
# Функция для расчета дистанции
def calculate_distance(lat1, lon1, lat2, lon2):
# Радиус Земли
R = 6371.0
lat1_rad = math.radians(lat1)
lon1_rad = math.radians(lon1)
lat2_rad = math.radians(lat2)
lon2_rad = math.radians(lon2)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c * 1000
return distance
distance_udf = F.udf(calculate_distance, DoubleType())
# фильтрация поездок с одинаковой начальной и конечной точкой
trip_data_filtered = trip_data.filter(
(trip_data["start station latitude"] != trip_data["end station latitude"]) |
(trip_data["start station longitude"] != trip_data["end station longitude"])
)
# добавляю колонку с дистанцией поездки
trip_data_with_distance = trip_data_filtered.withColumn(
"distance",
distance_udf(
F.col("start station latitude"),
F.col("start station longitude"),
F.col("end station latitude"),
F.col("end station longitude")
)
)
#trip_data_with_distance.show()
# считаю статистические данные
stats = trip_data_with_distance.select(
F.max("distance").alias("max_distance"),
F.mean("distance").alias("mean_distance"),
F.stddev("distance").alias("stddev_distance"),
F.expr("percentile(distance, 0.5)").alias("median_distance") # Медиана
).collect()
max_distance, mean_distance, stddev_distance, median_distance = stats[0]
print(f"Максимальное расстояние: {max_distance} м")
print(f"Среднее расстояние: {mean_distance} м")
print(f"Стандартное отклонение: {stddev_distance} м")
print(f"Медиана расстояний: {median_distance} м")
from pyspark.sql.functions import col, hour, dayofweek, date_format, to_date
from pyspark.sql.types import TimestampType
# конвертация форматов
trip_data = trip_data.withColumn("start_date", to_date("starttime")) \
.withColumn("stop_date", to_date("stoptime"))
# группировка и подсчет поездок по дням
start_counts = trip_data.groupBy("start station id", "start_date") \
.count() \
.withColumnRenamed("count", "start_count") \
.withColumnRenamed("start station id", "station_id")
stop_counts = trip_data.groupBy("end station id", "stop_date") \
.count() \
.withColumnRenamed("count", "stop_count") \
.withColumnRenamed("end station id", "station_id")
# соединение полученных подсчетов и заполнение нулями пропусков
daily_counts = start_counts.join(stop_counts, (start_counts.station_id == stop_counts.station_id) & \
(start_counts.start_date == stop_counts.stop_date), "outer") \
.select(start_counts.station_id, "start_date", "start_count", "stop_count")
daily_counts = daily_counts.na.fill(0)
# вычисление средних значений и аггрегация
average_counts = daily_counts.groupBy("station_id") \
.agg(F.avg("start_count").alias("avg_daily_starts"),
F.avg("stop_count").alias("avg_daily_stops"))
average_counts.show(10)
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
# Функция для определения времени суток
def get_time_of_day(hour):
if 6 <= hour < 12:
return 'Morning'
elif 12 <= hour < 18:
return 'Afternoon'
elif 18 <= hour < 24:
return 'Evening'
else:
return 'Night'
udf_time_of_day = F.udf(get_time_of_day, StringType())
# добавление столбцов с меткой интервала дня
trip_data = trip_data.withColumn('start_date', F.to_date('starttime')) \
.withColumn('stop_date', F.to_date('stoptime')) \
.withColumn('start_time_of_day', udf_time_of_day(F.hour('starttime'))) \
.withColumn('stop_time_of_day', udf_time_of_day(F.hour('stoptime'))) \
# группировка и подсчет поездок по интервалам
start_stations = trip_data.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
.withColumnRenamed("count", "start_count") \
.withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
.withColumnRenamed("count", "end_count") \
.withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)
# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
F.avg("start_count").alias("average_starts"),
F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
# station_time_stats = final_data.groupBy("station_id", "time_of_day").agg(
# F.avg("average_starts").alias("avg_start_trips"),
# F.avg("average_ends").alias("avg_end_trips")
# ).orderBy("station_id", "time_of_day")
# station_time_stats.show()
# соединяю с исходным датасетом и добавляю оттуда координаты
final_data_with_coords = final_data.alias("final").join(
trip_data.select("start station id", "start station latitude", "start station longitude").distinct().alias("trip"),
col("final.station_id") == col("trip.start station id"),
"inner"
)
#final_data_with_coords.show()
final_data_pandas = final_data_with_coords.toPandas()
import folium
from folium.plugins import HeatMapWithTime, HeatMap
map = folium.Map(location=[final_data_pandas['start station latitude'].mean(), final_data_pandas['start station longitude'].mean()], zoom_start=13)
grouped_data = final_data_pandas.groupby('time_of_day')
data_by_time = []
for time, group in grouped_data:
data = group[['start station latitude', 'start station longitude', 'average_starts']].values.tolist()
data_by_time.append(data)
HeatMapWithTime(data_by_time).add_to(map)
map
Для вычисления завершений поездок достаточно поменять параметр группировки в цикле по grouped_data, но из-за уже большого обьема ноутбука я этот шаг пропущу
использовал dayofweek чтобы получить номер дня недели ( по умолчанию 1 - воскресенье https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.dayofweek.html)
trip_data = trip_data.withColumn('start_day_of_week', dayofweek('start_date')) \
.withColumn('stop_day_of_week', dayofweek('stop_date'))
Достаточно применить фильтрацию к исходному датасету, и прогнать по такому же алгоритму
# Фильтруем по датам начала и завершения поездок = среде (4) и воскресенью (1)
trip_data = trip_data.filter((trip_data.start_day_of_week == 4) | (trip_data.start_day_of_week == 1)) \
.filter((trip_data.stop_day_of_week == 4) | (trip_data.stop_day_of_week == 1))
# группировка и подсчет поездок по интервалам
start_stations = trip_data.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
.withColumnRenamed("count", "start_count") \
.withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
.withColumnRenamed("count", "end_count") \
.withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)
# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
F.avg("start_count").alias("average_starts"),
F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
trip_data_wednesday = trip_data.filter((trip_data.start_day_of_week == 4)) \
.filter((trip_data.stop_day_of_week == 4))
# группировка и подсчет поездок по интервалам
start_stations = trip_data_wednesday.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data_wednesday.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
.withColumnRenamed("count", "start_count") \
.withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
.withColumnRenamed("count", "end_count") \
.withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)
# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
F.avg("start_count").alias("average_starts"),
F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
trip_data_sunday = trip_data.filter((trip_data.start_day_of_week == 1)) \
.filter((trip_data.stop_day_of_week == 1))
# группировка и подсчет поездок по интервалам
start_stations = trip_data_sunday.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data_sunday.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
.withColumnRenamed("count", "start_count") \
.withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
.withColumnRenamed("count", "end_count") \
.withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)
# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
F.avg("start_count").alias("average_starts"),
F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()